home *** CD-ROM | disk | FTP | other *** search
/ Personal Computer World 2008 February / PCWFEB08.iso / Software / Freeware / Miro 1.0 / Miro_Installer.exe / xulrunner / python / dl_daemon / natpunch.py < prev    next >
Encoding:
Python Source  |  2007-11-12  |  7.5 KB  |  255 lines

  1. # Written by John Hoffman
  2. # derived from NATPortMapping.py by Yejun Yang
  3. # and from example code by Myers Carpenter
  4. # see LICENSE.txt for license information
  5.  
  6. import socket
  7. from traceback import print_exc
  8. from subnetparse import IP_List
  9. from clock import clock
  10. from bittornadoinit import createPeerID
  11. try:
  12.     True
  13. except:
  14.     True = 1
  15.     False = 0
  16.  
  17. DEBUG = False
  18.  
  19. EXPIRE_CACHE = 30 # seconds
  20. ID = "BT-"+createPeerID()[-4:]
  21.  
  22. try:
  23.     import pythoncom, win32com.client
  24.     _supported = 1
  25. except ImportError:
  26.     _supported = 0
  27.  
  28.  
  29.  
  30. class _UPnP1:   # derived from Myers Carpenter's code
  31.                 # seems to use the machine's local UPnP
  32.                 # system for its operation.  Runs fairly fast
  33.  
  34.     def __init__(self):
  35.         self.map = None
  36.         self.last_got_map = -10e10
  37.  
  38.     def _get_map(self):
  39.         if self.last_got_map + EXPIRE_CACHE < clock():
  40.             try:
  41.                 dispatcher = win32com.client.Dispatch("HNetCfg.NATUPnP")
  42.                 self.map = dispatcher.StaticPortMappingCollection
  43.                 self.last_got_map = clock()
  44.             except:
  45.                 self.map = None
  46.         return self.map
  47.  
  48.     def test(self):
  49.         try:
  50.             assert self._get_map()     # make sure a map was found
  51.             success = True
  52.         except:
  53.             success = False
  54.         return success
  55.  
  56.  
  57.     def open(self, ip, p):
  58.         map = self._get_map()
  59.         try:
  60.             map.Add(p,'TCP',p,ip,True,ID)
  61.             if DEBUG:
  62.                 print 'port opened: '+ip+':'+str(p)
  63.             success = True
  64.         except:
  65.             if DEBUG:
  66.                 print "COULDN'T OPEN "+str(p)
  67.                 print_exc()
  68.             success = False
  69.         return success
  70.  
  71.  
  72.     def close(self, p):
  73.         map = self._get_map()
  74.         try:
  75.             map.Remove(p,'TCP')
  76.             success = True
  77.             if DEBUG:
  78.                 print 'port closed: '+str(p)
  79.         except:
  80.             if DEBUG:
  81.                 print 'ERROR CLOSING '+str(p)
  82.                 print_exc()
  83.             success = False
  84.         return success
  85.  
  86.  
  87.     def clean(self, retry = False):
  88.         if not _supported:
  89.             return
  90.         try:
  91.             map = self._get_map()
  92.             ports_in_use = []
  93.             for i in xrange(len(map)):
  94.                 try:
  95.                     mapping = map[i]
  96.                     port = mapping.ExternalPort
  97.                     prot = str(mapping.Protocol).lower()
  98.                     desc = str(mapping.Description).lower()
  99.                 except:
  100.                     port = None
  101.                 if port and prot == 'tcp' and desc[:3] == 'bt-':
  102.                     ports_in_use.append(port)
  103.             success = True
  104.             for port in ports_in_use:
  105.                 try:
  106.                     map.Remove(port,'TCP')
  107.                 except:
  108.                     success = False
  109.             if not success and not retry:
  110.                 self.clean(retry = True)
  111.         except:
  112.             pass
  113.  
  114.  
  115. class _UPnP2:   # derived from Yejun Yang's code
  116.                 # apparently does a direct search for UPnP hardware
  117.                 # may work in some cases where _UPnP1 won't, but is slow
  118.                 # still need to implement "clean" method
  119.  
  120.     def __init__(self):
  121.         self.services = None
  122.         self.last_got_services = -10e10
  123.                            
  124.     def _get_services(self):
  125.         if not self.services or self.last_got_services + EXPIRE_CACHE < clock():
  126.             self.services = []
  127.             try:
  128.                 f=win32com.client.Dispatch("UPnP.UPnPDeviceFinder")
  129.                 for t in ( "urn:schemas-upnp-org:service:WANIPConnection:1",
  130.                            "urn:schemas-upnp-org:service:WANPPPConnection:1" ):
  131.                     try:
  132.                         conns = f.FindByType(t,0)
  133.                         for c in xrange(len(conns)):
  134.                             try:
  135.                                 svcs = conns[c].Services
  136.                                 for s in xrange(len(svcs)):
  137.                                     try:
  138.                                         self.services.append(svcs[s])
  139.                                     except:
  140.                                         pass
  141.                             except:
  142.                                 pass
  143.                     except:
  144.                         pass
  145.             except:
  146.                 pass
  147.             self.last_got_services = clock()
  148.         return self.services
  149.  
  150.     def test(self):
  151.         try:
  152.             assert self._get_services()    # make sure some services can be found
  153.             success = True
  154.         except:
  155.             success = False
  156.         return success
  157.  
  158.  
  159.     def open(self, ip, p):
  160.         svcs = self._get_services()
  161.         success = False
  162.         for s in svcs:
  163.             try:
  164.                 s.InvokeAction('AddPortMapping',['',p,'TCP',p,ip,True,ID,0],'')
  165.                 success = True
  166.             except:
  167.                 pass
  168.         if DEBUG and not success:
  169.             print "COULDN'T OPEN "+str(p)
  170.             print_exc()
  171.         return success
  172.  
  173.  
  174.     def close(self, p):
  175.         svcs = self._get_services()
  176.         success = False
  177.         for s in svcs:
  178.             try:
  179.                 s.InvokeAction('DeletePortMapping', ['',p,'TCP'], '')
  180.                 success = True
  181.             except:
  182.                 pass
  183.         if DEBUG and not success:
  184.             print "COULDN'T OPEN "+str(p)
  185.             print_exc()
  186.         return success
  187.  
  188.  
  189. class _UPnP:    # master holding class
  190.     def __init__(self):
  191.         self.upnp1 = _UPnP1()
  192.         self.upnp2 = _UPnP2()
  193.         self.upnplist = (None, self.upnp1, self.upnp2)
  194.         self.upnp = None
  195.         self.local_ip = None
  196.         self.last_got_ip = -10e10
  197.         
  198.     def get_ip(self):
  199.         if self.last_got_ip + EXPIRE_CACHE < clock():
  200.             local_ips = IP_List()
  201.             local_ips.set_intranet_addresses()
  202.             try:
  203.                 for info in socket.getaddrinfo(socket.gethostname(),0):
  204.                             # exception if socket library isn't recent
  205.                     self.local_ip = info[4][0]
  206.                     if local_ips.includes(self.local_ip):
  207.                         self.last_got_ip = clock()
  208.                         if DEBUG:
  209.                             print 'Local IP found: '+self.local_ip
  210.                         break
  211.                 else:
  212.                     raise ValueError('couldn\'t find intranet IP')
  213.             except:
  214.                 self.local_ip = None
  215.                 if DEBUG:
  216.                     print 'Error finding local IP'
  217.                     print_exc()
  218.         return self.local_ip
  219.  
  220.     def test(self, upnp_type):
  221.         if DEBUG:
  222.             print 'testing UPnP type '+str(upnp_type)
  223.         if not upnp_type or not _supported or self.get_ip() is None:
  224.             if DEBUG:
  225.                 print 'not supported'
  226.             return 0
  227.         pythoncom.CoInitialize()                # leave initialized
  228.         self.upnp = self.upnplist[upnp_type]    # cache this
  229.         if self.upnp.test():
  230.             if DEBUG:
  231.                 print 'ok'
  232.             return upnp_type
  233.         if DEBUG:
  234.             print 'tested bad'
  235.         return 0
  236.  
  237.     def open(self, p):
  238.         assert self.upnp, "must run UPnP_test() with the desired UPnP access type first"
  239.         return self.upnp.open(self.get_ip(), p)
  240.  
  241.     def close(self, p):
  242.         assert self.upnp, "must run UPnP_test() with the desired UPnP access type first"
  243.         return self.upnp.close(p)
  244.  
  245.     def clean(self):
  246.         return self.upnp1.clean()
  247.  
  248. _upnp_ = _UPnP()
  249.  
  250. UPnP_test = _upnp_.test
  251. UPnP_open_port = _upnp_.open
  252. UPnP_close_port = _upnp_.close
  253. UPnP_reset = _upnp_.clean
  254.  
  255.